Go语言并发编程

105次阅读
没有评论

共计 4274 个字符,预计需要花费 11 分钟才能阅读完成。

goroutine(go 程)的概念类似于线程,但 goroutine 是由 Go 运行时(runtime)调度和管理的。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。

go 程占用的系统资源远远小于线程,一个 go 程大约需要 4K-5K 的内存资源,一个程序可以启动大量的 go 程。

协程

在 Go 中,协程(Coroutine)被称为 goroutine,是单线程下的并发,又称微线程。

package main

import (
  "fmt"
  "time"
)

func test() {
  for i := 0; i < 10; i++ {fmt.Println(i)
    time.Sleep(time.Second)
  }
}

func main() {go test() // 开启一个协程

  for i := 100; i < 110; i++ {fmt.Println(i)
    time.Sleep(time.Second)
  }
}

主死从随

如果主线程退出,协程即使没有执行完毕,也会退出。

func test() {
  for i := 0; i < 1000; i++ {fmt.Println(i)
    time.Sleep(time.Second)
  }
}

func main() {go test() // 开启一个协程

  for i := 1000; i < 1010; i++ {fmt.Println(i)
    time.Sleep(time.Second)
  }
}

WaitGroup 用于等待一组线程结束。父线程调用 Add 方法来设定应等待的线程数量,每个被等待线程在结束时调用 Done 方法。同时,
主线程里可以调用 Wait 方法阻塞至所有线程结束,使得主线程在子协程结束后再自动结束。

var wg sync.WaitGroup // 只定义无需赋值

func main() {// wg.Add(5) // 如果知道协程数量,可以直接设置计数器
  for i := 1; i <= 5; i++ {wg.Add(1) // 协程开始,计数器加 1
    go func(n int) {defer wg.Done() // 协程结束,计数器减 1
      fmt.Println(n)
    }(i)
  }

  wg.Wait() // 主线程一直在阻塞,等待所有协程结束,直到 wg 计数器为 0}

互斥锁

多个协程操作同一数据,可能会导致不可预测的结果或者程序错误。

Mutex 为互斥锁,Lock() 加锁,Unlock() 解锁,加锁后不能再次对其进行加锁,直到对其解锁后才能再次加锁,适用于读写不确定场景,即读写次数没有明显的区别。所以说,其性能、效率相对来说比较低。

var wg sync.WaitGroup
var lock sync.Mutex
var count int

func add() {defer wg.Done()
  for i := 0; i < 100000; i++ {lock.Lock() // 加锁
    count++
    lock.Unlock() // 解锁}
}

func sub() {defer wg.Done()
  for i := 0; i < 100000; i++ {lock.Lock() // 加锁
    count--
    lock.Unlock() // 解锁}
}

func main() {wg.Add(2)
  go add()
  go sub()
  wg.Wait()
  println(count) // 0
}

读写锁

RWMutex 是一个读写锁,其经常用于读次数远远多于写次数的场景。在读的时候,数据之间不产生影响,写和读之间才会产生影响。

var wg sync.WaitGroup
var lock sync.RWMutex

func read() {defer wg.Done()
  lock.RLock()
  println("read start")
  time.Sleep(time.Second)
  println("read stop")
  lock.RUnlock()}

func write() {defer wg.Done()
  lock.Lock()
  println("write start")
  time.Sleep(time.Second * 2)
  println("write stop")
  lock.Unlock()}

func main() {wg.Add(7)
  for i := 0; i < 5; i++ {go read()
  }
  go write()
  go write()

  wg.Wait()}

原子操作

加锁操作性能开销大,原子操作性能由于加锁操作。

package main

import (
  "fmt"
  "sync"
  "sync/atomic"
  "time"
)

var x int64
var l sync.Mutex
var wg sync.WaitGroup

func mutexAdd() {l.Lock()
  x++
  l.Unlock()
  wg.Done()}

func atomicAdd() {atomic.AddInt64(&x, 1)
  wg.Done()}

func main() {start := time.Now()
  for i := 0; i < 1000000; i++ {wg.Add(1)
    // go mutexAdd() // 加锁版 add 函数
    go atomicAdd() // 原子操作版 add 函数}
  wg.Wait()

  end := time.Now()
  fmt.Println(x)
  fmt.Println(end.Sub(start))
}

提前退出 go 程

func main() {go func() {func() {println(" 子 go 程内部函数 ")
      // return // 退出当前函数
      // os.Exit(-1) // 退出整个程序
      runtime.Goexit() // 退出当前 go 程}()

    println(" 子 go 程退出 ")
  }()

  println(" 主 go 程 ")
  time.Sleep(time.Second)
  println(" 主 go 程退出 ")
}

runtime 包

runtime.Gosched() 让出 CPU 时间片,重新等待安排任务:

func main() {go func(s string) {
    for i := 0; i < 3; i++ {fmt.Println(s)
    }
  }("world")

  for i := 0; i < 3; i++ {runtime.Gosched() // 切一下,再次分配任务
    fmt.Println("hello")
  }
}

runtime.GOMAXPROCS():
Go 运行时的调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码,默认值是机器上的 CPU 核心数,通过 runtime.GOMAXPROCS() 函数可设置当前程序并发时占用的 CPU 逻辑核心数。

Go 1.5 版本之前,默认使用的是单核心执行。Go 1.5 版本之后,默认使用全部的 CPU 逻辑核心数,可以通过将任务分配到不同的 CPU 逻辑核心上实现并行的效果。

管道

管道(channel)本质是一个数据结构 - 队列。数据是先进先出的,多协程访问时,不需要加锁,channel 本身就是线程安全的。管道是有类型的,一个 string 管道只能存放 string 类型数据。

WaitGroup、Mutex、Cond 是传统同步机制,可以使用管道来等待 goroutine 结束。

func main() {
  // 定义一个 int 类型的管道
  var intChan chan int
  intChan = make(chan int, 3) // 初始化管道,容量为 3
  intChan <- 1                // 向管道中写入数据
  intChan <- 2

  close(intChan) // 管道关闭后,不能再写入数据,但仍然可以读取数据

  n1 := <-intChan // 从管道中读取数据
  n2 := <-intChan

  println(n1, n2)
}

默认情况下,管道是双向的,即可读可写。若想让管道只写:var intChan chan<- int,只读:var intChan <-chan int

func producer(out chan<- int) {
  for i := 0; i < 10; i++ {
    out <- i
    println(" 生产 ", i)
  }
}

func consumer(in <-chan int) {
  for i := range in {println(" 消费 ", i)
  }
}

func main() {numChan := make(chan int, 5)
  go producer(numChan) // 双向管道可以赋值给同类型单向管道,反之不行
  go consumer(numChan)
  time.Sleep(time.Second * 2)
}

管道遍历

func main() {
  var intChan chan int
  intChan = make(chan int, 100)
  for i := 0; i < 100; i++ {intChan <- i}

  // 遍历前要关闭管道,否则会出现死锁(for range 会一直等待)close(intChan)
  for v := range intChan {println("value:", v)
  }
}

select

解决多个管道的选择问题,也可以叫做多路复用,可以从多个管道中随机公平地选择一个来执行。case 后面必须进行的是 io 操作,不能是等值,随机去选择一个 io 操作。防止 select 被阻塞住,加入 default。

func main() {intChan := make(chan int, 1)
  go func() {time.Sleep(time.Second * 10)
    intChan <- 10
  }()

  strChan := make(chan string, 1)
  go func() {time.Sleep(time.Second * 15)
    strChan <- "hello"
  }()

  select {
  case v := <-intChan:
    println("int value:", v)
  case v := <-strChan:
    println("string value:", v)
  default:
    println("no value received")
  }
}

无缓冲管道

使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道。

func recv(c chan int) {
  ret := <-c // 接收数据
  fmt.Println("recv:", ret)
}

func main() {ch := make(chan int)
  go recv(ch) // 启用 goroutine 从管道接收数据
  ch <- 100   // 向管道发送数据
  fmt.Println("main: send 100 to channel")
}

有缓冲管道

只要管道容量大于零,那么该管道就是有缓冲管道,管道容量表示管道中能存放元素的数量。

管道总结

  • 当管道写满了,写阻塞
  • 当缓冲区读完了,读阻塞
  • 从 nil 管道读取、写入数据,都会阻塞,不会崩溃
  • 从一个已经 close 的管道读取数据,返回零值,不会崩溃
  • 向一个已经 close 的管道写数据,会崩溃
  • 关闭一个已经 close 的管道,会崩溃
  • 读写次数,一定要对等

正文完
 0
三毛笔记
版权声明:本站原创文章,由 三毛笔记 于2024-02-22发表,共计4274字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)